{{- if .Values.provisioning.enabled }}
{{- $replicaCount := int .Values.replicaCount }}
kind: Job
apiVersion: batch/v1
metadata:
  name: {{ printf "%s-provisioning" (include "common.names.fullname" .) }}
  namespace: {{ .Release.Namespace | quote }}
  labels: {{- include "common.labels.standard" . | nindent 4 }}
    app.kubernetes.io/component: kafka-provisioning
    {{- if .Values.commonLabels }}
    {{- include "common.tplvalues.render" ( dict "value" .Values.commonLabels "context" $ ) | nindent 4 }}
    {{- end }}
  annotations:
    helm.sh/hook: post-install,post-upgrade
    helm.sh/hook-delete-policy: before-hook-creation,hook-succeeded
    {{- if .Values.commonAnnotations }}
    {{- include "common.tplvalues.render" ( dict "value" .Values.commonAnnotations "context" $ ) | nindent 4 }}
    {{- end }}
spec:
  template:
    metadata:
      labels: {{- include "common.labels.standard" . | nindent 8 }}
        app.kubernetes.io/component: kafka-provisioning
        {{- if .Values.provisioning.podLabels }}
        {{- include "common.tplvalues.render" (dict "value" .Values.provisioning.podLabels "context" $) | nindent 8 }}
        {{- end }}
      annotations:
        {{- if .Values.provisioning.podAnnotations }}
        {{- include "common.tplvalues.render" (dict "value" .Values.provisioning.podAnnotations "context" $) | nindent 8 }}
        {{- end }}
    spec:
      serviceAccountName: {{ template "kafka.provisioning.serviceAccountName" . }}
      {{- include "kafka.imagePullSecrets" . | nindent 6 }}
      {{- if .Values.provisioning.schedulerName }}
      schedulerName: {{ .Values.provisioning.schedulerName | quote }}
      {{- end }}
      {{- if .Values.provisioning.podSecurityContext.enabled }}
      securityContext: {{- omit .Values.provisioning.podSecurityContext "enabled" | toYaml | nindent 8 }}
      {{- end }}
      restartPolicy: OnFailure
      terminationGracePeriodSeconds: 0
      {{- if .Values.provisioning.nodeSelector }}
      nodeSelector: {{- include "common.tplvalues.render" ( dict "value" .Values.provisioning.nodeSelector "context" $) | nindent 8 }}
      {{- end }}
      {{- if .Values.provisioning.tolerations }}
      tolerations: {{- include "common.tplvalues.render" (dict "value" .Values.provisioning.tolerations "context" .) | nindent 8 }}
      {{- end }}
      {{- if or .Values.provisioning.initContainers .Values.provisioning.waitForKafka }}
      initContainers:
        {{- if .Values.provisioning.waitForKafka }}
        - name: wait-for-available-kafka
          image: {{ include "kafka.image" . }}
          imagePullPolicy: {{ .Values.image.pullPolicy | quote }}
          {{- if .Values.provisioning.containerSecurityContext.enabled }}
          securityContext: {{- omit .Values.provisioning.containerSecurityContext "enabled" | toYaml | nindent 12 }}
          {{- end }}
          command:
            - /bin/bash
          args:
            - -ec
            - |
              wait-for-port \
                --host={{ include "common.names.fullname" . }} \
                --state=inuse \
                --timeout=120 \
                {{ .Values.service.ports.client | int64 }};
              echo "Kafka is available";
          {{- if .Values.provisioning.resources }}
          resources: {{- toYaml .Values.provisioning.resources | nindent 12 }}
          {{- end }}
        {{- end }}
        {{- if .Values.provisioning.initContainers }}
        {{- include "common.tplvalues.render" ( dict "value" .Values.provisioning.initContainers "context" $ ) | nindent 8 }}
        {{- end }}
      {{- end }}
      containers:
        - name: kafka-provisioning
          image: {{ include "kafka.image" . }}
          imagePullPolicy: {{ .Values.image.pullPolicy | quote }}
          {{- if .Values.provisioning.containerSecurityContext.enabled }}
          securityContext: {{- omit .Values.provisioning.containerSecurityContext "enabled" | toYaml | nindent 12 }}
          {{- end }}
          {{- if .Values.diagnosticMode.enabled }}
          command: {{- include "common.tplvalues.render" (dict "value" .Values.diagnosticMode.command "context" $) | nindent 12 }}
          {{- else if .Values.provisioning.command }}
          command: {{- include "common.tplvalues.render" (dict "value" .Values.provisioning.command "context" $) | nindent 12 }}
          {{- else }}
          command:
            - /bin/bash
          {{- end }}
          {{- if .Values.diagnosticMode.enabled }}
          args: {{- include "common.tplvalues.render" (dict "value" .Values.diagnosticMode.args "context" $) | nindent 12 }}
          {{- else if .Values.provisioning.args }}
          args: {{- include "common.tplvalues.render" (dict "value" .Values.provisioning.args "context" $) | nindent 12 }}
          {{- else }}
          args:
            - -ec
            - |
              echo "Configuring environment"
              . /opt/bitnami/scripts/libkafka.sh
              export CLIENT_CONF="${CLIENT_CONF:-/opt/bitnami/kafka/config/client.properties}"
              if [ ! -f "$CLIENT_CONF" ]; then
                touch $CLIENT_CONF

                kafka_common_conf_set "$CLIENT_CONF" security.protocol {{ include "kafka.listenerType" ( dict "protocol" .Values.auth.clientProtocol ) | quote }}
                {{- if (include "kafka.client.tlsEncryption" .) }}
                kafka_common_conf_set "$CLIENT_CONF" ssl.keystore.type {{ upper .Values.provisioning.auth.tls.type | quote }}
                kafka_common_conf_set "$CLIENT_CONF" ssl.truststore.type {{ upper .Values.provisioning.auth.tls.type | quote }}
                ! is_empty_value "$KAFKA_CLIENT_KEY_PASSWORD" && kafka_common_conf_set "$CLIENT_CONF" ssl.key.password "$KAFKA_CLIENT_KEY_PASSWORD"
                {{- if eq (upper .Values.provisioning.auth.tls.type) "PEM" }}
                file_to_multiline_property() {
                    awk 'NR > 1{print line" \\"}{line=$0;}END{print $0" "}' <"${1:?missing file}"
                }
                kafka_common_conf_set "$CLIENT_CONF" ssl.keystore.key "$(file_to_multiline_property "/certs/{{ .Values.provisioning.auth.tls.key }}")"
                kafka_common_conf_set "$CLIENT_CONF" ssl.keystore.certificate.chain "$(file_to_multiline_property "/certs/{{ .Values.provisioning.auth.tls.caCert }}")"
                kafka_common_conf_set "$CLIENT_CONF" ssl.truststore.certificates "$(file_to_multiline_property "/certs/{{ .Values.provisioning.auth.tls.cert }}")"
                {{- else if eq (upper .Values.provisioning.auth.tls.type) "JKS" }}
                kafka_common_conf_set "$CLIENT_CONF" ssl.keystore.location "/certs/{{ .Values.provisioning.auth.tls.keystore }}"
                kafka_common_conf_set "$CLIENT_CONF" ssl.truststore.location "/certs/{{ .Values.provisioning.auth.tls.truststore }}"
                ! is_empty_value "$KAFKA_CLIENT_KEYSTORE_PASSWORD" && kafka_common_conf_set "$CLIENT_CONF" ssl.keystore.password "$KAFKA_CLIENT_KEYSTORE_PASSWORD"
                ! is_empty_value "$KAFKA_CLIENT_TRUSTSTORE_PASSWORD" && kafka_common_conf_set "$CLIENT_CONF" ssl.truststore.password "$KAFKA_CLIENT_TRUSTSTORE_PASSWORD"
                {{- end }}
                {{- end }}
                {{- if (include "kafka.client.saslAuthentication" .) }}
                {{- if contains "plain" .Values.auth.sasl.mechanisms }}
                kafka_common_conf_set "$CLIENT_CONF" sasl.mechanism PLAIN
                kafka_common_conf_set "$CLIENT_CONF" sasl.jaas.config "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$SASL_USERNAME\" password=\"$SASL_USER_PASSWORD\";"
                {{- else if contains "scram-sha-256" .Values.auth.sasl.mechanisms }}
                kafka_common_conf_set "$CLIENT_CONF" sasl.mechanism SCRAM-SHA-256
                kafka_common_conf_set "$CLIENT_CONF" sasl.jaas.config "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"$SASL_USERNAME\" password=\"$SASL_USER_PASSWORD\";"
                {{- else if contains "scram-sha-512" .Values.auth.sasl.mechanisms }}
                kafka_common_conf_set "$CLIENT_CONF" sasl.mechanism SCRAM-SHA-512
                kafka_common_conf_set "$CLIENT_CONF" sasl.jaas.config "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"$SASL_USERNAME\" password=\"$SASL_USER_PASSWORD\";"
                {{- end }}
                {{- end }}
              fi

              echo "Running pre-provisioning script if any given"
              {{ .Values.provisioning.preScript | nindent 14 }}

              kafka_provisioning_commands=(
              {{- range $topic := .Values.provisioning.topics }}
                "/opt/bitnami/kafka/bin/kafka-topics.sh \
                    --create \
                    --if-not-exists \
                    --bootstrap-server ${KAFKA_SERVICE} \
                    --replication-factor {{ $topic.replicationFactor | default $.Values.provisioning.replicationFactor }} \
                    --partitions {{ $topic.partitions | default $.Values.provisioning.numPartitions }} \
                    {{- range $name, $value := $topic.config }}
                    --config {{ $name }}={{ $value }} \
                    {{- end }}
                    --command-config ${CLIENT_CONF} \
                    --topic {{ $topic.name }}"
              {{- end }}
              {{- range $command := .Values.provisioning.extraProvisioningCommands }}
                {{- $command | quote | nindent 16 }}
              {{- end }}
              )

              echo "Starting provisioning"
              for ((index=0; index < ${#kafka_provisioning_commands[@]}; index+={{ .Values.provisioning.parallel }}))
              do
                for j in $(seq ${index} $((${index}+{{ .Values.provisioning.parallel }}-1)))
                do
                    ${kafka_provisioning_commands[j]} & # Async command
                done
                wait  # Wait the end of the jobs
              done

              echo "Running post-provisioning script if any given"
              {{ .Values.provisioning.postScript | nindent 14 }}

              echo "Provisioning succeeded"
          {{- end }}
          env:
            - name: BITNAMI_DEBUG
              value: {{ ternary "true" "false" (or .Values.image.debug .Values.diagnosticMode.enabled) | quote }}
            {{- if (include "kafka.client.tlsEncryption" .) }}
            - name: KAFKA_CLIENT_KEY_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: {{ template "kafka.client.passwordsSecretName" . }}
                  key: {{ .Values.provisioning.auth.tls.keyPasswordSecretKey }}
            - name: KAFKA_CLIENT_KEYSTORE_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: {{ template "kafka.client.passwordsSecretName" . }}
                  key: {{ .Values.provisioning.auth.tls.keystorePasswordSecretKey }}
            - name: KAFKA_CLIENT_TRUSTSTORE_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: {{ template "kafka.client.passwordsSecretName" . }}
                  key: {{ .Values.provisioning.auth.tls.truststorePasswordSecretKey }}
            {{- end }}
            - name: KAFKA_SERVICE
              value: {{ printf "%s:%d" (include "common.names.fullname" .) (.Values.service.ports.client | int64) }}
            {{- if (include "kafka.client.saslAuthentication" .) }}
            {{- $clientUsers := .Values.auth.sasl.jaas.clientUsers }}
            - name: SASL_USERNAME
              value: {{ index $clientUsers 0 | quote }}
            - name: SASL_USER_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: {{ include "kafka.jaasSecretName" . }}
                  key: system-user-password
            {{- end }}
            {{- if .Values.provisioning.extraEnvVars }}
            {{- include "common.tplvalues.render" ( dict "value" .Values.provisioning.extraEnvVars "context" $) | nindent 12 }}
            {{- end }}
          {{- if or .Values.provisioning.extraEnvVarsCM .Values.provisioning.extraEnvVarsSecret }}
          envFrom:
            {{- if .Values.provisioning.extraEnvVarsCM }}
            - configMapRef:
              name: {{ include "common.tplvalues.render" (dict "value" .Values.provisioning.extraEnvVarsCM "context" $) }}
            {{- end }}
            {{- if .Values.provisioning.extraEnvVarsSecret }}
            - secretRef:
              name: {{ include "common.tplvalues.render" (dict "value" .Values.provisioning.extraEnvVarsSecret "context" $) }}
            {{- end }}
          {{- end }}
          {{- if .Values.provisioning.resources }}
          resources: {{- toYaml .Values.provisioning.resources | nindent 12 }}
          {{- end }}
          volumeMounts:
            {{- if or .Values.log4j .Values.existingLog4jConfigMap }}
            - name: log4j-config
              mountPath: {{ .Values.persistence.mountPath }}/config/log4j.properties
              subPath: log4j.properties
            {{- end }}
            {{- if (include "kafka.client.tlsEncryption" .) }}
            {{- if not (empty .Values.provisioning.auth.tls.certificatesSecret) }}
            - name: kafka-client-certs
              mountPath: /certs
              readOnly: true
            {{- end }}
            {{- end }}
            {{- if .Values.provisioning.extraVolumeMounts }}
            {{- include "common.tplvalues.render" (dict "value" .Values.provisioning.extraVolumeMounts "context" $) | nindent 12 }}
            {{- end }}
        {{- if .Values.provisioning.sidecars }}
        {{- include "common.tplvalues.render" (dict "value" .Values.provisioning.sidecars "context" $) | nindent 8 }}
        {{- end }}
      volumes:
        {{- if or .Values.log4j .Values.existingLog4jConfigMap }}
        - name: log4j-config
          configMap:
            name: {{ include "kafka.log4j.configMapName" . }}
        {{ end }}
        {{- if (include "kafka.client.tlsEncryption" .) }}
        {{- if not (empty .Values.provisioning.auth.tls.certificatesSecret) }}
        - name: kafka-client-certs
          secret:
            secretName: {{ .Values.provisioning.auth.tls.certificatesSecret }}
            defaultMode: 256
        {{- end }}
        {{- end }}
        {{- if .Values.provisioning.extraVolumes }}
        {{- include "common.tplvalues.render" (dict "value" .Values.provisioning.extraVolumes "context" $) | nindent 8 }}
        {{- end }}
{{- end }}
